Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async iteration of RecordBatchStream #975

Merged
merged 5 commits into from
Jan 9, 2025

Conversation

kylebarron
Copy link
Contributor

Which issue does this PR close?

Closes #974 .

Rationale for this change

Support async iteration of RecordBatchStream.

What changes are included in this PR?

  • Move into Rust raising StopIteration or AsyncStopIteration errors
  • Update typing
  • Add dependency on pyo3-async-runtimes to manage future conversion.
  • Move SendableRecordBatchStream into an Arc<Mutex<>>. This is required I think so that we can clone the stream into future_into_py

Are there any user-facing changes?

Adds async iterator support.

@kylebarron
Copy link
Contributor Author

If this looks good, we can add a test using pytest-asyncio

@timsaucer
Copy link
Contributor

At first glance, this looks like a very nice add. Do you need help resolving the test failures?

@kylebarron
Copy link
Contributor Author

This PR changes the behavior of stream.next() to raise StopIteration when there are no more batches available in the stream.

This matches default iterator behavior:

image

This also matches pyarrow.RecordBatchReader behavior

In [11]: import pyarrow as pa

In [12]: batch = pa.record_batch([])

In [13]: batch2 = pa.record_batch([])

In [15]: reader = pa.RecordBatchReader.from_batches(batch.schema, [batch, batch2])

In [16]: reader.read_next_batch()
Out[16]:
pyarrow.RecordBatch

----

In [17]: reader.read_next_batch()
Out[17]:
pyarrow.RecordBatch

----

In [18]: reader.read_next_batch()
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
Cell In[18], line 1
----> 1 reader.read_next_batch()

File ~/.pyenv/versions/3.11.8/lib/python3.11/site-packages/pyarrow/ipc.pxi:708, in pyarrow.lib.RecordBatchReader.read_next_batch()

StopIteration:

@kylebarron
Copy link
Contributor Author

I'd like to add a test with pytest-asyncio, but I don't know how to add the dependency, ref #977

Copy link
Contributor

@timsaucer timsaucer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I've had a chance to actually review, this looks great. Can we add an issue to track the unit test for pytest-asyncio after the update to uv for package management?

python/datafusion/record_batch.py Outdated Show resolved Hide resolved
@timsaucer timsaucer merged commit 4b262be into apache:main Jan 9, 2025
23 checks passed
@kylebarron
Copy link
Contributor Author

Thanks for making that update!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add async iterator support to RecordBatchStream
2 participants